package com.dc.schedule.server.service.task.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import com.dc.schedule.api.context.FireTaskContext;
import com.dc.schedule.api.status.TaskExecutionStatus;
import com.dc.schedule.server.base.TransactionalExecutor;
import com.dc.schedule.server.config.ServerConfig;
import com.dc.schedule.server.domain.StaticTaskEntity;
import com.dc.schedule.server.domain.TaskExecutionEntity;
import com.dc.schedule.server.repository.StaticTaskRepository;
import com.dc.schedule.server.repository.TaskExecutionRepository;
import com.dc.schedule.server.service.client.impl.ClientHandler;
import com.dc.schedule.server.service.task.TaskManageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.orm.ObjectOptimisticLockingFailureException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

/***
 * descriptions...
 * @author Diamon.Cheng
 * @date 2022/8/3
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class TaskManageServiceImpl implements TaskManageService {
    
    private final TaskScheduler taskScheduler;
    /**
     * taskKey, taskInfo
     */
    private final Map<String, ScheduledTask> scheduledTaskMap = new ConcurrentHashMap<>(3);
    
    private final Object lock = new Object();
    
    private final TransactionalExecutor transactionalExecutor;
    private final StaticTaskRepository staticTaskRepository;
    private final TaskExecutionRepository taskExecutionRepository;
    
    /**
     * 注册客户端的调度器
     *
     * @param staticTasks   任务列表
     * @param clientHandler 客户端回调
     */
    @Override
    public void registerClientTask(List<StaticTaskEntity> staticTasks, ClientHandler clientHandler) {
        synchronized (lock) {
            for (StaticTaskEntity staticTask : staticTasks) {
                scheduledTaskMap.computeIfAbsent(staticTask.getTaskKey(), (key) -> {
                    final ScheduledTask scheduledTask = new ScheduledTask();
                    scheduledTask.setClients(Collections.synchronizedMap(new LinkedHashMap<>(3)));
                    scheduledTask.setTaskKey(staticTask.getTaskKey());
                    scheduledTask.setTaskName(staticTask.getTaskName());
                    scheduledTask.setConfigVersion(staticTask.getConfigVersion());
                    scheduleTask(staticTask, scheduledTask);
                    return scheduledTask;
                }).getClients().put(clientHandler.getClientKey(), clientHandler);
            }
        }
    }
    
    private void scheduleTask(StaticTaskEntity staticTask, ScheduledTask scheduledTask) {
        try {
            scheduledTask.setTrigger(new CronTrigger(staticTask.getTriggerExpression()));
            staticTask.setNextFireTime(scheduleTask(scheduledTask));
            staticTask.setErrorMessage(null);
        } catch (Exception e) {
            log.warn("调度任务失败", e);
            staticTask.setErrorMessage(CharSequenceUtil.maxLength(e.getMessage(), 250));
        } finally {
            staticTaskRepository.findById(staticTask.getId()).ifPresentOrElse(stored -> {
                stored.setNextFireTime(staticTask.getNextFireTime());
                stored.setErrorMessage(staticTask.getErrorMessage());
            }, () -> {
                transactionalExecutor.executeRunnable(() -> {
                    final StaticTaskEntity stored = staticTaskRepository.getReferenceById(staticTask.getId());
                    stored.setNextFireTime(staticTask.getNextFireTime());
                    stored.setErrorMessage(staticTask.getErrorMessage());
                });
            });
            
        }
    }
    
    private Date scheduleTask(ScheduledTask scheduledTask) {
        final CronTrigger trigger = scheduledTask.getTrigger();
        final SimpleTriggerContext triggerContext = new SimpleTriggerContext();
        final Date nextFireTime = trigger.nextExecutionTime(triggerContext);
        final ScheduledTask finalScheduledTask = scheduledTask;
        log.info("添加调度任务 {}", scheduledTask.getTaskKey());
        final ScheduledFuture<?> future = taskScheduler.schedule(() -> {
            synchronized (finalScheduledTask) {
                this.fireTask(finalScheduledTask);
            }
        }, trigger);
        scheduledTask.setFuture(future);
        scheduledTask.setTrigger(trigger);
        scheduledTask.setTriggerContext(triggerContext);
        scheduledTask.setFireTime(nextFireTime);
        return nextFireTime;
    }
    
    private void fireTask(ScheduledTask scheduledTask) {
        final Date expectFireTimeDate = scheduledTask.getFireTime();
        final Date actualExecutionTime = new Date(scheduledTask.getTriggerContext().getClock().millis());
        scheduledTask.getTriggerContext().update(expectFireTimeDate, actualExecutionTime, actualExecutionTime);
        final Date nextFireTime = scheduledTask.getTrigger().nextExecutionTime(scheduledTask.getTriggerContext());
        scheduledTask.setFireTime(nextFireTime);
        log.debug(
                "[调度任务] 触发：TaskKey:{},Version:{},expectFireDate:{}, nextFireDate:{}",
                scheduledTask.getTaskKey(),
                scheduledTask.getConfigVersion(),
                DateUtil.formatDateTime(expectFireTimeDate),
                DateUtil.formatDateTime(nextFireTime)
        );
        fireTask(scheduledTask, expectFireTimeDate, nextFireTime);
    }
    
    private void fireTask(ScheduledTask scheduledTask, Date expectFireTimeDate, Date nextFireTime) {
        if (scheduledTask.getClients().isEmpty()) {
            log.info("任务[{}]没有客户端可以触发", scheduledTask.getTaskKey());
            return;
        }
        final StaticTaskEntity staticTaskEntity = staticTaskRepository.getByTaskKey(scheduledTask.getTaskKey());
        if (!Boolean.TRUE.equals(staticTaskEntity.getEnabled())) {
            log.info("任务[{}]处于不触发状态", scheduledTask.getTaskKey());
            return;
        }
        // 单例任务
        final ClientHandler randomClient = RandomUtil.randomEle(new ArrayList<>(scheduledTask.getClients().values()));
        final LocalDateTime expectedFireTime = DateUtil.toLocalDateTime(expectFireTimeDate);
        final String executionKey =
                "Single:"
                        + staticTaskEntity.getTaskKey()
                        + "@"
                        + DateUtil.format(
                        expectedFireTime,
                        "yyyyMMddHHmmss"
                );
        TaskExecutionEntity execution = new TaskExecutionEntity();
        execution.setStaticTask(staticTaskEntity);
        execution.setClientKey(randomClient.getClientKey());
        execution.setExecutionKey(executionKey);
        execution.setExecutionParam("{}");
        execution.setExpectedFireTime(expectedFireTime);
        execution.setStatus(TaskExecutionStatus.FIRE);
        try {
            final TaskExecutionEntity executionFinal = execution;
            execution = transactionalExecutor.execute(() -> taskExecutionRepository.save(executionFinal));
        } catch (DataIntegrityViolationException e) {
            log.debug("数据库锁获取失败:" + executionKey + " 不执行任务。", e);
            return;
        }
        final FireTaskContext context = new FireTaskContext();
        context.setTaskName(scheduledTask.getTaskName());
        context.setExpectFireTime(expectFireTimeDate);
        context.setExecutionId(execution.getId());
        context.setExecutionKey(execution.getExecutionKey());
        context.setExecutionParam(execution.getExecutionParam());
        try {
            checkStaticTaskIsExecuting(scheduledTask, staticTaskEntity);
            log.info("发起任务 {}, 客户端: {}", scheduledTask.getTaskKey(), randomClient.getClientKey());
            randomClient.fireTask(context);
            execution.setStatus(TaskExecutionStatus.FIRED);
        } catch (Exception e) {
            final String errorMsg0 = "任务调度失败, " + context + ", " + randomClient;
            log.error(errorMsg0, e);
            execution.setStatus(TaskExecutionStatus.FIRE_FAILED);
            execution.setMessage(errorMsg0 + "\n" +
                                         ExceptionUtil.stacktraceToString(e, 1020 - errorMsg0.length()));
        }
        staticTaskEntity.setLastFireTime(expectFireTimeDate);
        staticTaskEntity.setNextFireTime(nextFireTime);
        staticTaskRepository.saveAndFlush(staticTaskEntity);
        try {
            taskExecutionRepository.saveAndFlush(execution);
        } catch (ObjectOptimisticLockingFailureException e) {
            log.trace("如果是更新状态失败， 那么有可能是任务先一步返回ACK了， 无所谓", e);
        }
        
        
    }
    
    private void checkStaticTaskIsExecuting(ScheduledTask scheduledTask, StaticTaskEntity staticTaskEntity) {
        // 任务正在执行中, 不触发, TODO, 根据配置
        // 可能会有中断的任务卡死在执行中的状态, 需要构建检测失活任务的机制
        // TODO 服务端开定时任务问客户端这个任务在不在执行, 若不在, 将任务定为失联状态 TaskExecutionStatus.MISSING
        if (1 == 1) {
            // 因为存在以上问题这个功能先禁用
            return;
        }
        final List<TaskExecutionEntity> executions =
                taskExecutionRepository.findByStaticTaskAndStatusIn(staticTaskEntity, Set.of(
                        TaskExecutionStatus.FIRED, TaskExecutionStatus.EXECUTING
                ));
        if (!executions.isEmpty()) {
            log.warn("有正在执行中的任务, 不能触发任务[{}], 正在执行中任务:{},", scheduledTask.getTaskKey(),
                     executions.stream().map(e -> e.getExecutionKey() + "@" + e.getClientKey())
                             .collect(Collectors.toList())
            );
            throw new IllegalStateException("任务[" + scheduledTask.getTaskKey() + "]正在执行中, 调度任务不触发");
        }
    }
    
    /**
     * 取消注册客户端, 如果一个定时任务没有客户端了, 那就消除掉定时任务
     *
     * @param clientKey 客户端的键
     */
    @Override
    public void unregister(String clientKey) {
        synchronized (lock) {
            final Iterator<Map.Entry<String, ScheduledTask>> iterator = scheduledTaskMap.entrySet().iterator();
            while (iterator.hasNext()) {
                final Map.Entry<String, ScheduledTask> entry = iterator.next();
                final ScheduledTask scheduledTask = entry.getValue();
                scheduledTask.getClients().remove(clientKey);
                if (scheduledTask.getClients().isEmpty()) {
                    log.info("取消注册任务 {}", scheduledTask.getTaskKey());
                    if (scheduledTask.getFuture() != null) {
                        scheduledTask.getFuture().cancel(false);
                    }
                    iterator.remove();
                }
            }
        }
    }
    
    /**
     * 更新执行任务状态
     *
     * @param executionId 执行id
     * @param status      状态
     * @param message     信息
     */
    @Override
    public void refreshExecutionStatus(Long executionId, TaskExecutionStatus status, String message) {
        log.info("Refresh execution status: {}, {}", executionId, status.name());
        // 由于异步问题这里可能会有乐观锁问题, 需要多次重试
        int tryCount = 0;
        ObjectOptimisticLockingFailureException exception = null;
        do {
            try {
                transactionalExecutor.executeRunnable(() -> {
                    final TaskExecutionEntity taskExecutionEntity =
                            taskExecutionRepository.findById(executionId).orElseThrow();
                    taskExecutionEntity.setStatus(status);
                    if (message != null) {
                        taskExecutionEntity.setMessage(StrUtil.maxLength(message, 2000));
                    }
                    if (TaskExecutionStatus.EXECUTING.equals(status)) {
                        taskExecutionEntity.setClientAckTime(LocalDateTime.now());
                    }
                    if (TaskExecutionStatus.EXECUTED.equals(status)) {
                        taskExecutionEntity.setCompletedTime(LocalDateTime.now());
                    }
                    if (TaskExecutionStatus.EXECUTE_FAILED.equals(status)) {
                        taskExecutionEntity.setCompletedTime(LocalDateTime.now());
                    }
                    taskExecutionRepository.saveAndFlush(taskExecutionEntity);
                });
                return;
            } catch (ObjectOptimisticLockingFailureException e) {
                if (exception == null) {
                    exception = e;
                } else {
                    exception.addSuppressed(e);
                }
                log.info("Try Failed! Refreshing execution status: {}, {}", executionId, status.name());
            }
        } while (tryCount++ < ServerConfig.OPTIMISTIC_LOCK_RETRY_COUNT);
        
        throw exception;
        
    }
    
    @Scheduled(
            cron = "0/${schedule.server.task-refresh-rate-seconds:5} * * * * ? "
    )
    @Transactional(rollbackFor = RuntimeException.class)
    public void refreshTaskScheduleConfig() {
        final Map<String, ScheduledTask> configNeedReschedule = new LinkedHashMap<>();
        final Set<String> configRemoved = new LinkedHashSet<>();
        final Set<String> taskKeys = scheduledTaskMap.keySet();
        final List<StaticTaskEntity> tasks = staticTaskRepository.findAllByTaskKeyIn(taskKeys);
        final Map<String, StaticTaskEntity> taskKeyTaskMap = tasks.stream().collect(Collectors.toMap(
                StaticTaskEntity::getTaskKey,
                Function.identity()
        ));
        scheduledTaskMap.forEach((taskKey, scheduleInfo) -> {
            final StaticTaskEntity staticTaskEntity = taskKeyTaskMap.get(taskKey);
            if (staticTaskEntity == null) {
                configRemoved.add(taskKey);
            } else if (staticTaskEntity.getConfigVersion() > scheduleInfo.getConfigVersion()) {
                if (scheduleInfo.getTrigger() == null || !Objects.equals(staticTaskEntity.getTriggerExpression(), scheduleInfo.getTrigger().getExpression())) {
                    final ScheduledTask updated = new ScheduledTask();
                    BeanUtils.copyProperties(scheduleInfo, updated);
                    updated.setFuture(null);
                    updated.setTriggerContext(null);
                    updated.setFireTime(null);
                    updated.setConfigVersion(staticTaskEntity.getConfigVersion());
                    updated.setTaskName(staticTaskEntity.getTaskName());
                    configNeedReschedule.put(taskKey, updated);
                } else {
                    scheduleInfo.setConfigVersion(staticTaskEntity.getConfigVersion());
                    scheduleInfo.setTaskName(staticTaskEntity.getTaskName());
                }
            }
        });
        synchronized (lock) {
            configRemoved.forEach(taskKey -> {
                final ScheduledTask toRemove = scheduledTaskMap.remove(taskKey);
                if (toRemove != null && toRemove.getFuture() != null) {
                    toRemove.getFuture().cancel(false);
                }
            });
            configNeedReschedule.forEach((taskKey, newSchedule) -> {
                final StaticTaskEntity staticTaskEntity = taskKeyTaskMap.get(taskKey);
                final ScheduledTask oldSchedule = scheduledTaskMap.remove(taskKey);
                if (oldSchedule != null) {
                    if (oldSchedule.getFuture() != null) {
                        oldSchedule.getFuture().cancel(false);
                    }
                    newSchedule.setClients(oldSchedule.getClients());
                    oldSchedule.setClients(null);
                    oldSchedule.setFuture(null);
                    oldSchedule.setTriggerContext(null);
                    oldSchedule.setFireTime(null);
                } else {
                    newSchedule.setClients(Collections.synchronizedMap(new LinkedHashMap<>(3)));
                }
                scheduleTask(staticTaskEntity, newSchedule);
                scheduledTaskMap.put(taskKey, newSchedule);
            });
        }
        log.info("Refresh task schedule config, refreshed:{}, removed:{}", configNeedReschedule,
                 configRemoved
        );
        
    }
}
